package com.syj.qdp.framework.redis.config;

import cn.hutool.system.SystemUtil;
import com.syj.qdp.framework.redis.core.pubsub.AbstractChannelMessageListener;
import com.syj.qdp.framework.redis.core.pubsub.ChannelMessage;
import com.syj.qdp.framework.redis.core.stream.AbstractStreamMessageListener;
import com.syj.qdp.framework.redis.core.stream.StreamMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.ObjectRecord;
import org.springframework.data.redis.connection.stream.ReadOffset;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;

import java.util.List;

/**
 * @author Lyon
 */
@Configuration
@AutoConfigureAfter(RedisAutoConfiguration.class)
@Slf4j
public class RedisMQAutoConfiguration {

    /**
     * 发布/订阅模式
     *
     * @param redisConnectionFactory
     * @param messageListeners
     * @return
     */
    @Bean
    @ConditionalOnMissingBean
    @ConditionalOnBean(AbstractChannelMessageListener.class)
    RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory redisConnectionFactory,
                                                                @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") List<AbstractChannelMessageListener<? extends ChannelMessage>> messageListeners) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(redisConnectionFactory);
        messageListeners.forEach(listener -> {
            container.addMessageListener(listener, new ChannelTopic(listener.getChannel()));
            log.info("[redisMessageListenerContainer][注册 Channel({}) 对应的监听器({})]",
                    listener.getChannel(), listener.getClass().getName());
                }
        );
        return container;
    }

    @Bean(initMethod = "start", destroyMethod = "stop")
    @ConditionalOnMissingBean
    @ConditionalOnBean(AbstractChannelMessageListener.class)
    StreamMessageListenerContainer<String, ObjectRecord<String, String>> streamMessageListenerContainer(RedisTemplate<String, Object> redisTemplate,
                                                                                                        RedisConnectionFactory redisConnectionFactory,
                                                                                                        @SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection") List<AbstractStreamMessageListener<? extends StreamMessage>> listeners) {
        /** 创建 StreamMessageListenerContainer 容器*/
        StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String, String>> containerOptions = StreamMessageListenerContainer
                .StreamMessageListenerContainerOptions
                .builder()
                // 每次拉取数量
                .batchSize(10)
                // value数据类型 默认String.class 自定义去反序列化
                .targetType(String.class)
                .build();
        StreamMessageListenerContainer<String, ObjectRecord<String, String>> container = StreamMessageListenerContainer
                .create(redisConnectionFactory, containerOptions);

        /**创建消费组，注入消费监听器消费*/
        // 生成当前消费者的标识信息
        String consumerName = bulidConsumerName();
        listeners.forEach(listener -> {
            String group = listener.getGroup();
            String streamKey = listener.getStreamKey();
            /**
             *  创建消费组 -：必须先创建消费组，才能通过消费组去消费；
             *  我们需要多个消费者配合协作来消费同一个消息队列，就是消息队列中有10条消息，三个消费者分别消费其中的某些消息。
             *  比如消费者A消费消息1、2、5、8，消费者B消费消息4、9、10，而消费者C消费消息3、6、7。也就是三个消费者配合完成消息的消费。
             *  可以在消费能力不足，也就是消息处理程序效率不高时，使用该模式。该模式就是消费者组模式
             *  */
            try {
                redisTemplate.opsForStream().createGroup(streamKey, group);
            } catch (Exception ignore) {}
            // 设置redis操作类
            listener.setRedisTemplate(redisTemplate);
            // 创建Consumer group：消费组信息  consumerName：当前消费者标识；
            Consumer consumer = Consumer.from(group, consumerName);
            // 设置 Consumer 消费进度，以最小消费进度为准
            StreamOffset<String> streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());
            StreamMessageListenerContainer.ConsumerStreamReadRequest<String> streamReadRequest = StreamMessageListenerContainer
                    .StreamReadRequest
                    .builder(streamOffset)
                    .consumer(consumer)
                    // 手动 ack
                    .autoAcknowledge(false)
                    // 默认配置，发生异常就取消消费，显然不符合预期；因此，我们设置为 false
                    .cancelOnError(throwable -> false)
                    .build();
            container.register(streamReadRequest, listener);
        });
        return container;
    }

    /**
     * 生成当前消费者标识
     *
     * @return
     */
    private String bulidConsumerName() {
        return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());
    }

}
